1 /*
2 * Copyright (C) 2012 The Guava Authors
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16 package com.google.common.util.concurrent;
17
18 import static com.google.common.base.Preconditions.checkArgument;
19 import static com.google.common.base.Preconditions.checkNotNull;
20 import static com.google.common.base.Preconditions.checkState;
21 import static com.google.common.base.Predicates.equalTo;
22 import static com.google.common.base.Predicates.in;
23 import static com.google.common.base.Predicates.instanceOf;
24 import static com.google.common.base.Predicates.not;
25 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
26 import static com.google.common.util.concurrent.Service.State.FAILED;
27 import static com.google.common.util.concurrent.Service.State.NEW;
28 import static com.google.common.util.concurrent.Service.State.RUNNING;
29 import static com.google.common.util.concurrent.Service.State.STARTING;
30 import static com.google.common.util.concurrent.Service.State.STOPPING;
31 import static com.google.common.util.concurrent.Service.State.TERMINATED;
32 import static java.util.concurrent.TimeUnit.MILLISECONDS;
33
34 import com.google.common.annotations.Beta;
35 import com.google.common.base.Function;
36 import com.google.common.base.MoreObjects;
37 import com.google.common.base.Stopwatch;
38 import com.google.common.base.Supplier;
39 import com.google.common.collect.Collections2;
40 import com.google.common.collect.ImmutableCollection;
41 import com.google.common.collect.ImmutableList;
42 import com.google.common.collect.ImmutableMap;
43 import com.google.common.collect.ImmutableMultimap;
44 import com.google.common.collect.ImmutableSet;
45 import com.google.common.collect.ImmutableSetMultimap;
46 import com.google.common.collect.Lists;
47 import com.google.common.collect.Maps;
48 import com.google.common.collect.Multimaps;
49 import com.google.common.collect.Multiset;
50 import com.google.common.collect.Ordering;
51 import com.google.common.collect.SetMultimap;
52 import com.google.common.collect.Sets;
53 import com.google.common.util.concurrent.ListenerCallQueue.Callback;
54 import com.google.common.util.concurrent.Service.State;
55
56 import java.lang.ref.WeakReference;
57 import java.util.ArrayList;
58 import java.util.Collection;
59 import java.util.Collections;
60 import java.util.EnumMap;
61 import java.util.List;
62 import java.util.Map;
63 import java.util.Map.Entry;
64 import java.util.Set;
65 import java.util.concurrent.Executor;
66 import java.util.concurrent.TimeUnit;
67 import java.util.concurrent.TimeoutException;
68 import java.util.logging.Level;
69 import java.util.logging.Logger;
70
71 import javax.annotation.concurrent.GuardedBy;
72
73 /**
74 * A manager for monitoring and controlling a set of {@linkplain Service services}. This class
75 * provides methods for {@linkplain #startAsync() starting}, {@linkplain #stopAsync() stopping} and
76 * {@linkplain #servicesByState inspecting} a collection of {@linkplain Service services}.
77 * Additionally, users can monitor state transitions with the {@linkplain Listener listener}
78 * mechanism.
79 *
80 * <p>While it is recommended that service lifecycles be managed via this class, state transitions
81 * initiated via other mechanisms do not impact the correctness of its methods. For example, if the
82 * services are started by some mechanism besides {@link #startAsync}, the listeners will be invoked
83 * when appropriate and {@link #awaitHealthy} will still work as expected.
84 *
85 * <p>Here is a simple example of how to use a {@code ServiceManager} to start a server.
86 * <pre> {@code
87 * class Server {
88 * public static void main(String[] args) {
89 * Set<Service> services = ...;
90 * ServiceManager manager = new ServiceManager(services);
91 * manager.addListener(new Listener() {
92 * public void stopped() {}
93 * public void healthy() {
94 * // Services have been initialized and are healthy, start accepting requests...
95 * }
96 * public void failure(Service service) {
97 * // Something failed, at this point we could log it, notify a load balancer, or take
98 * // some other action. For now we will just exit.
99 * System.exit(1);
100 * }
101 * },
102 * MoreExecutors.directExecutor());
103 *
104 * Runtime.getRuntime().addShutdownHook(new Thread() {
105 * public void run() {
106 * // Give the services 5 seconds to stop to ensure that we are responsive to shutdown
107 * // requests.
108 * try {
109 * manager.stopAsync().awaitStopped(5, TimeUnit.SECONDS);
110 * } catch (TimeoutException timeout) {
111 * // stopping timed out
112 * }
113 * }
114 * });
115 * manager.startAsync(); // start all the services asynchronously
116 * }
117 * }}</pre>
118 *
119 * <p>This class uses the ServiceManager's methods to start all of its services, to respond to
120 * service failure and to ensure that when the JVM is shutting down all the services are stopped.
121 *
122 * @author Luke Sandberg
123 * @since 14.0
124 */
125 @Beta
126 public final class ServiceManager {
127 private static final Logger logger = Logger.getLogger(ServiceManager.class.getName());
128 private static final Callback<Listener> HEALTHY_CALLBACK = new Callback<Listener>("healthy()") {
129 @Override void call(Listener listener) {
130 listener.healthy();
131 }
132 };
133 private static final Callback<Listener> STOPPED_CALLBACK = new Callback<Listener>("stopped()") {
134 @Override void call(Listener listener) {
135 listener.stopped();
136 }
137 };
138
139 /**
140 * A listener for the aggregate state changes of the services that are under management. Users
141 * that need to listen to more fine-grained events (such as when each particular {@linkplain
142 * Service service} starts, or terminates), should attach {@linkplain Service.Listener service
143 * listeners} to each individual service.
144 *
145 * @author Luke Sandberg
146 * @since 15.0 (present as an interface in 14.0)
147 */
148 @Beta // Should come out of Beta when ServiceManager does
149 public abstract static class Listener {
150 /**
151 * Called when the service initially becomes healthy.
152 *
153 * <p>This will be called at most once after all the services have entered the
154 * {@linkplain State#RUNNING running} state. If any services fail during start up or
155 * {@linkplain State#FAILED fail}/{@linkplain State#TERMINATED terminate} before all other
156 * services have started {@linkplain State#RUNNING running} then this method will not be called.
157 */
158 public void healthy() {}
159
160 /**
161 * Called when the all of the component services have reached a terminal state, either
162 * {@linkplain State#TERMINATED terminated} or {@linkplain State#FAILED failed}.
163 */
164 public void stopped() {}
165
166 /**
167 * Called when a component service has {@linkplain State#FAILED failed}.
168 *
169 * @param service The service that failed.
170 */
171 public void failure(Service service) {}
172 }
173
174 /**
175 * An encapsulation of all of the state that is accessed by the {@linkplain ServiceListener
176 * service listeners}. This is extracted into its own object so that {@link ServiceListener}
177 * could be made {@code static} and its instances can be safely constructed and added in the
178 * {@link ServiceManager} constructor without having to close over the partially constructed
179 * {@link ServiceManager} instance (i.e. avoid leaking a pointer to {@code this}).
180 */
181 private final ServiceManagerState state;
182 private final ImmutableList<Service> services;
183
184 /**
185 * Constructs a new instance for managing the given services.
186 *
187 * @param services The services to manage
188 *
189 * @throws IllegalArgumentException if not all services are {@linkplain State#NEW new} or if there
190 * are any duplicate services.
191 */
192 public ServiceManager(Iterable<? extends Service> services) {
193 ImmutableList<Service> copy = ImmutableList.copyOf(services);
194 if (copy.isEmpty()) {
195 // Having no services causes the manager to behave strangely. Notably, listeners are never
196 // fired. To avoid this we substitute a placeholder service.
197 logger.log(Level.WARNING,
198 "ServiceManager configured with no services. Is your application configured properly?",
199 new EmptyServiceManagerWarning());
200 copy = ImmutableList.<Service>of(new NoOpService());
201 }
202 this.state = new ServiceManagerState(copy);
203 this.services = copy;
204 WeakReference<ServiceManagerState> stateReference =
205 new WeakReference<ServiceManagerState>(state);
206 for (Service service : copy) {
207 service.addListener(new ServiceListener(service, stateReference), directExecutor());
208 // We check the state after adding the listener as a way to ensure that our listener was added
209 // to a NEW service.
210 checkArgument(service.state() == NEW, "Can only manage NEW services, %s", service);
211 }
212 // We have installed all of our listeners and after this point any state transition should be
213 // correct.
214 this.state.markReady();
215 }
216
217 /**
218 * Registers a {@link Listener} to be {@linkplain Executor#execute executed} on the given
219 * executor. The listener will not have previous state changes replayed, so it is
220 * suggested that listeners are added before any of the managed services are
221 * {@linkplain Service#startAsync started}.
222 *
223 * <p>{@code addListener} guarantees execution ordering across calls to a given listener but not
224 * across calls to multiple listeners. Specifically, a given listener will have its callbacks
225 * invoked in the same order as the underlying service enters those states. Additionally, at most
226 * one of the listener's callbacks will execute at once. However, multiple listeners' callbacks
227 * may execute concurrently, and listeners may execute in an order different from the one in which
228 * they were registered.
229 *
230 * <p>RuntimeExceptions thrown by a listener will be caught and logged. Any exception thrown
231 * during {@code Executor.execute} (e.g., a {@code RejectedExecutionException}) will be caught and
232 * logged.
233 *
234 * <p> For fast, lightweight listeners that would be safe to execute in any thread, consider
235 * calling {@link #addListener(Listener)}.
236 *
237 * @param listener the listener to run when the manager changes state
238 * @param executor the executor in which the listeners callback methods will be run.
239 */
240 public void addListener(Listener listener, Executor executor) {
241 state.addListener(listener, executor);
242 }
243
244 /**
245 * Registers a {@link Listener} to be run when this {@link ServiceManager} changes state. The
246 * listener will not have previous state changes replayed, so it is suggested that listeners are
247 * added before any of the managed services are {@linkplain Service#startAsync started}.
248 *
249 * <p>{@code addListener} guarantees execution ordering across calls to a given listener but not
250 * across calls to multiple listeners. Specifically, a given listener will have its callbacks
251 * invoked in the same order as the underlying service enters those states. Additionally, at most
252 * one of the listener's callbacks will execute at once. However, multiple listeners' callbacks
253 * may execute concurrently, and listeners may execute in an order different from the one in which
254 * they were registered.
255 *
256 * <p>RuntimeExceptions thrown by a listener will be caught and logged.
257 *
258 * @param listener the listener to run when the manager changes state
259 */
260 public void addListener(Listener listener) {
261 state.addListener(listener, directExecutor());
262 }
263
264 /**
265 * Initiates service {@linkplain Service#startAsync startup} on all the services being managed.
266 * It is only valid to call this method if all of the services are {@linkplain State#NEW new}.
267 *
268 * @return this
269 * @throws IllegalStateException if any of the Services are not {@link State#NEW new} when the
270 * method is called.
271 */
272 public ServiceManager startAsync() {
273 for (Service service : services) {
274 State state = service.state();
275 checkState(state == NEW, "Service %s is %s, cannot start it.", service, state);
276 }
277 for (Service service : services) {
278 try {
279 state.tryStartTiming(service);
280 service.startAsync();
281 } catch (IllegalStateException e) {
282 // This can happen if the service has already been started or stopped (e.g. by another
283 // service or listener). Our contract says it is safe to call this method if
284 // all services were NEW when it was called, and this has already been verified above, so we
285 // don't propagate the exception.
286 logger.log(Level.WARNING, "Unable to start Service " + service, e);
287 }
288 }
289 return this;
290 }
291
292 /**
293 * Waits for the {@link ServiceManager} to become {@linkplain #isHealthy() healthy}. The manager
294 * will become healthy after all the component services have reached the {@linkplain State#RUNNING
295 * running} state.
296 *
297 * @throws IllegalStateException if the service manager reaches a state from which it cannot
298 * become {@linkplain #isHealthy() healthy}.
299 */
300 public void awaitHealthy() {
301 state.awaitHealthy();
302 }
303
304 /**
305 * Waits for the {@link ServiceManager} to become {@linkplain #isHealthy() healthy} for no more
306 * than the given time. The manager will become healthy after all the component services have
307 * reached the {@linkplain State#RUNNING running} state.
308 *
309 * @param timeout the maximum time to wait
310 * @param unit the time unit of the timeout argument
311 * @throws TimeoutException if not all of the services have finished starting within the deadline
312 * @throws IllegalStateException if the service manager reaches a state from which it cannot
313 * become {@linkplain #isHealthy() healthy}.
314 */
315 public void awaitHealthy(long timeout, TimeUnit unit) throws TimeoutException {
316 state.awaitHealthy(timeout, unit);
317 }
318
319 /**
320 * Initiates service {@linkplain Service#stopAsync shutdown} if necessary on all the services
321 * being managed.
322 *
323 * @return this
324 */
325 public ServiceManager stopAsync() {
326 for (Service service : services) {
327 service.stopAsync();
328 }
329 return this;
330 }
331
332 /**
333 * Waits for the all the services to reach a terminal state. After this method returns all
334 * services will either be {@linkplain Service.State#TERMINATED terminated} or {@linkplain
335 * Service.State#FAILED failed}.
336 */
337 public void awaitStopped() {
338 state.awaitStopped();
339 }
340
341 /**
342 * Waits for the all the services to reach a terminal state for no more than the given time. After
343 * this method returns all services will either be {@linkplain Service.State#TERMINATED
344 * terminated} or {@linkplain Service.State#FAILED failed}.
345 *
346 * @param timeout the maximum time to wait
347 * @param unit the time unit of the timeout argument
348 * @throws TimeoutException if not all of the services have stopped within the deadline
349 */
350 public void awaitStopped(long timeout, TimeUnit unit) throws TimeoutException {
351 state.awaitStopped(timeout, unit);
352 }
353
354 /**
355 * Returns true if all services are currently in the {@linkplain State#RUNNING running} state.
356 *
357 * <p>Users who want more detailed information should use the {@link #servicesByState} method to
358 * get detailed information about which services are not running.
359 */
360 public boolean isHealthy() {
361 for (Service service : services) {
362 if (!service.isRunning()) {
363 return false;
364 }
365 }
366 return true;
367 }
368
369 /**
370 * Provides a snapshot of the current state of all the services under management.
371 *
372 * <p>N.B. This snapshot is guaranteed to be consistent, i.e. the set of states returned will
373 * correspond to a point in time view of the services.
374 */
375 public ImmutableMultimap<State, Service> servicesByState() {
376 return state.servicesByState();
377 }
378
379 /**
380 * Returns the service load times. This value will only return startup times for services that
381 * have finished starting.
382 *
383 * @return Map of services and their corresponding startup time in millis, the map entries will be
384 * ordered by startup time.
385 */
386 public ImmutableMap<Service, Long> startupTimes() {
387 return state.startupTimes();
388 }
389
390 @Override public String toString() {
391 return MoreObjects.toStringHelper(ServiceManager.class)
392 .add("services", Collections2.filter(services, not(instanceOf(NoOpService.class))))
393 .toString();
394 }
395
396 /**
397 * An encapsulation of all the mutable state of the {@link ServiceManager} that needs to be
398 * accessed by instances of {@link ServiceListener}.
399 */
400 private static final class ServiceManagerState {
401 final Monitor monitor = new Monitor();
402
403 @GuardedBy("monitor")
404 final SetMultimap<State, Service> servicesByState =
405 Multimaps.newSetMultimap(new EnumMap<State, Collection<Service>>(State.class),
406 new Supplier<Set<Service>>() {
407 @Override public Set<Service> get() {
408 return Sets.newLinkedHashSet();
409 }
410 });
411
412 @GuardedBy("monitor")
413 final Multiset<State> states = servicesByState.keys();
414
415 @GuardedBy("monitor")
416 final Map<Service, Stopwatch> startupTimers = Maps.newIdentityHashMap();
417
418 /**
419 * These two booleans are used to mark the state as ready to start.
420 * {@link #ready}: is set by {@link #markReady} to indicate that all listeners have been
421 * correctly installed
422 * {@link #transitioned}: is set by {@link #transitionService} to indicate that some transition
423 * has been performed.
424 *
425 * <p>Together, they allow us to enforce that all services have their listeners installed prior
426 * to any service performing a transition, then we can fail in the ServiceManager constructor
427 * rather than in a Service.Listener callback.
428 */
429 @GuardedBy("monitor")
430 boolean ready;
431
432 @GuardedBy("monitor")
433 boolean transitioned;
434
435 final int numberOfServices;
436
437 /**
438 * Controls how long to wait for all the services to either become healthy or reach a
439 * state from which it is guaranteed that it can never become healthy.
440 */
441 final Monitor.Guard awaitHealthGuard = new Monitor.Guard(monitor) {
442 @Override public boolean isSatisfied() {
443 // All services have started or some service has terminated/failed.
444 return states.count(RUNNING) == numberOfServices
445 || states.contains(STOPPING)
446 || states.contains(TERMINATED)
447 || states.contains(FAILED);
448 }
449 };
450
451 /**
452 * Controls how long to wait for all services to reach a terminal state.
453 */
454 final Monitor.Guard stoppedGuard = new Monitor.Guard(monitor) {
455 @Override public boolean isSatisfied() {
456 return states.count(TERMINATED) + states.count(FAILED) == numberOfServices;
457 }
458 };
459
460 /** The listeners to notify during a state transition. */
461 @GuardedBy("monitor")
462 final List<ListenerCallQueue<Listener>> listeners =
463 Collections.synchronizedList(new ArrayList<ListenerCallQueue<Listener>>());
464
465 /**
466 * It is implicitly assumed that all the services are NEW and that they will all remain NEW
467 * until all the Listeners are installed and {@link #markReady()} is called. It is our caller's
468 * responsibility to only call {@link #markReady()} if all services were new at the time this
469 * method was called and when all the listeners were installed.
470 */
471 ServiceManagerState(ImmutableCollection<Service> services) {
472 this.numberOfServices = services.size();
473 servicesByState.putAll(NEW, services);
474 }
475
476 /**
477 * Attempts to start the timer immediately prior to the service being started via
478 * {@link Service#startAsync()}.
479 */
480 void tryStartTiming(Service service) {
481 monitor.enter();
482 try {
483 Stopwatch stopwatch = startupTimers.get(service);
484 if (stopwatch == null) {
485 startupTimers.put(service, Stopwatch.createStarted());
486 }
487 } finally {
488 monitor.leave();
489 }
490 }
491
492 /**
493 * Marks the {@link State} as ready to receive transitions. Returns true if no transitions have
494 * been observed yet.
495 */
496 void markReady() {
497 monitor.enter();
498 try {
499 if (!transitioned) {
500 // nothing has transitioned since construction, good.
501 ready = true;
502 } else {
503 // This should be an extremely rare race condition.
504 List<Service> servicesInBadStates = Lists.newArrayList();
505 for (Service service : servicesByState().values()) {
506 if (service.state() != NEW) {
507 servicesInBadStates.add(service);
508 }
509 }
510 throw new IllegalArgumentException("Services started transitioning asynchronously before "
511 + "the ServiceManager was constructed: " + servicesInBadStates);
512 }
513 } finally {
514 monitor.leave();
515 }
516 }
517
518 void addListener(Listener listener, Executor executor) {
519 checkNotNull(listener, "listener");
520 checkNotNull(executor, "executor");
521 monitor.enter();
522 try {
523 // no point in adding a listener that will never be called
524 if (!stoppedGuard.isSatisfied()) {
525 listeners.add(new ListenerCallQueue<Listener>(listener, executor));
526 }
527 } finally {
528 monitor.leave();
529 }
530 }
531
532 void awaitHealthy() {
533 monitor.enterWhenUninterruptibly(awaitHealthGuard);
534 try {
535 checkHealthy();
536 } finally {
537 monitor.leave();
538 }
539 }
540
541 void awaitHealthy(long timeout, TimeUnit unit) throws TimeoutException {
542 monitor.enter();
543 try {
544 if (!monitor.waitForUninterruptibly(awaitHealthGuard, timeout, unit)) {
545 throw new TimeoutException("Timeout waiting for the services to become healthy. The "
546 + "following services have not started: "
547 + Multimaps.filterKeys(servicesByState, in(ImmutableSet.of(NEW, STARTING))));
548 }
549 checkHealthy();
550 } finally {
551 monitor.leave();
552 }
553 }
554
555 void awaitStopped() {
556 monitor.enterWhenUninterruptibly(stoppedGuard);
557 monitor.leave();
558 }
559
560 void awaitStopped(long timeout, TimeUnit unit) throws TimeoutException {
561 monitor.enter();
562 try {
563 if (!monitor.waitForUninterruptibly(stoppedGuard, timeout, unit)) {
564 throw new TimeoutException("Timeout waiting for the services to stop. The following "
565 + "services have not stopped: "
566 + Multimaps.filterKeys(servicesByState,
567 not(in(ImmutableSet.of(TERMINATED, FAILED)))));
568 }
569 } finally {
570 monitor.leave();
571 }
572 }
573
574 ImmutableMultimap<State, Service> servicesByState() {
575 ImmutableSetMultimap.Builder<State, Service> builder = ImmutableSetMultimap.builder();
576 monitor.enter();
577 try {
578 for (Entry<State, Service> entry : servicesByState.entries()) {
579 if (!(entry.getValue() instanceof NoOpService)) {
580 builder.put(entry.getKey(), entry.getValue());
581 }
582 }
583 } finally {
584 monitor.leave();
585 }
586 return builder.build();
587 }
588
589 ImmutableMap<Service, Long> startupTimes() {
590 List<Entry<Service, Long>> loadTimes;
591 monitor.enter();
592 try {
593 loadTimes = Lists.newArrayListWithCapacity(startupTimers.size());
594 // N.B. There will only be an entry in the map if the service has started
595 for (Entry<Service, Stopwatch> entry : startupTimers.entrySet()) {
596 Service service = entry.getKey();
597 Stopwatch stopWatch = entry.getValue();
598 if (!stopWatch.isRunning() && !(service instanceof NoOpService)) {
599 loadTimes.add(Maps.immutableEntry(service, stopWatch.elapsed(MILLISECONDS)));
600 }
601 }
602 } finally {
603 monitor.leave();
604 }
605 Collections.sort(loadTimes, Ordering.<Long>natural()
606 .onResultOf(new Function<Entry<Service, Long>, Long>() {
607 @Override public Long apply(Map.Entry<Service, Long> input) {
608 return input.getValue();
609 }
610 }));
611 ImmutableMap.Builder<Service, Long> builder = ImmutableMap.builder();
612 for (Entry<Service, Long> entry : loadTimes) {
613 builder.put(entry);
614 }
615 return builder.build();
616 }
617
618 /**
619 * Updates the state with the given service transition.
620 *
621 * <p>This method performs the main logic of ServiceManager in the following steps.
622 * <ol>
623 * <li>Update the {@link #servicesByState()}
624 * <li>Update the {@link #startupTimers}
625 * <li>Based on the new state queue listeners to run
626 * <li>Run the listeners (outside of the lock)
627 * </ol>
628 */
629 void transitionService(final Service service, State from, State to) {
630 checkNotNull(service);
631 checkArgument(from != to);
632 monitor.enter();
633 try {
634 transitioned = true;
635 if (!ready) {
636 return;
637 }
638 // Update state.
639 checkState(servicesByState.remove(from, service),
640 "Service %s not at the expected location in the state map %s", service, from);
641 checkState(servicesByState.put(to, service),
642 "Service %s in the state map unexpectedly at %s", service, to);
643 // Update the timer
644 Stopwatch stopwatch = startupTimers.get(service);
645 if (stopwatch == null) {
646 // This means the service was started by some means other than ServiceManager.startAsync
647 stopwatch = Stopwatch.createStarted();
648 startupTimers.put(service, stopwatch);
649 }
650 if (to.compareTo(RUNNING) >= 0 && stopwatch.isRunning()) {
651 // N.B. if we miss the STARTING event then we may never record a startup time.
652 stopwatch.stop();
653 if (!(service instanceof NoOpService)) {
654 logger.log(Level.FINE, "Started {0} in {1}.", new Object[] {service, stopwatch});
655 }
656 }
657 // Queue our listeners
658
659 // Did a service fail?
660 if (to == FAILED) {
661 fireFailedListeners(service);
662 }
663
664 if (states.count(RUNNING) == numberOfServices) {
665 // This means that the manager is currently healthy. N.B. If other threads call isHealthy
666 // they are not guaranteed to get 'true', because any service could fail right now.
667 fireHealthyListeners();
668 } else if (states.count(TERMINATED) + states.count(FAILED) == numberOfServices) {
669 fireStoppedListeners();
670 }
671 } finally {
672 monitor.leave();
673 // Run our executors outside of the lock
674 executeListeners();
675 }
676 }
677
678 @GuardedBy("monitor")
679 void fireStoppedListeners() {
680 STOPPED_CALLBACK.enqueueOn(listeners);
681 }
682
683 @GuardedBy("monitor")
684 void fireHealthyListeners() {
685 HEALTHY_CALLBACK.enqueueOn(listeners);
686 }
687
688 @GuardedBy("monitor")
689 void fireFailedListeners(final Service service) {
690 new Callback<Listener>("failed({service=" + service + "})") {
691 @Override void call(Listener listener) {
692 listener.failure(service);
693 }
694 }.enqueueOn(listeners);
695 }
696
697 /** Attempts to execute all the listeners in {@link #listeners}. */
698 void executeListeners() {
699 checkState(!monitor.isOccupiedByCurrentThread(),
700 "It is incorrect to execute listeners with the monitor held.");
701 // iterate by index to avoid concurrent modification exceptions
702 for (int i = 0; i < listeners.size(); i++) {
703 listeners.get(i).execute();
704 }
705 }
706
707 @GuardedBy("monitor")
708 void checkHealthy() {
709 if (states.count(RUNNING) != numberOfServices) {
710 IllegalStateException exception = new IllegalStateException(
711 "Expected to be healthy after starting. The following services are not running: "
712 + Multimaps.filterKeys(servicesByState, not(equalTo(RUNNING))));
713 throw exception;
714 }
715 }
716 }
717
718 /**
719 * A {@link Service} that wraps another service and times how long it takes for it to start and
720 * also calls the {@link ServiceManagerState#transitionService(Service, State, State)},
721 * to record the state transitions.
722 */
723 private static final class ServiceListener extends Service.Listener {
724 final Service service;
725 // We store the state in a weak reference to ensure that if something went wrong while
726 // constructing the ServiceManager we don't pointlessly keep updating the state.
727 final WeakReference<ServiceManagerState> state;
728
729 ServiceListener(Service service, WeakReference<ServiceManagerState> state) {
730 this.service = service;
731 this.state = state;
732 }
733
734 @Override public void starting() {
735 ServiceManagerState state = this.state.get();
736 if (state != null) {
737 state.transitionService(service, NEW, STARTING);
738 if (!(service instanceof NoOpService)) {
739 logger.log(Level.FINE, "Starting {0}.", service);
740 }
741 }
742 }
743
744 @Override public void running() {
745 ServiceManagerState state = this.state.get();
746 if (state != null) {
747 state.transitionService(service, STARTING, RUNNING);
748 }
749 }
750
751 @Override public void stopping(State from) {
752 ServiceManagerState state = this.state.get();
753 if (state != null) {
754 state.transitionService(service, from, STOPPING);
755 }
756 }
757
758 @Override public void terminated(State from) {
759 ServiceManagerState state = this.state.get();
760 if (state != null) {
761 if (!(service instanceof NoOpService)) {
762 logger.log(Level.FINE, "Service {0} has terminated. Previous state was: {1}",
763 new Object[] {service, from});
764 }
765 state.transitionService(service, from, TERMINATED);
766 }
767 }
768
769 @Override public void failed(State from, Throwable failure) {
770 ServiceManagerState state = this.state.get();
771 if (state != null) {
772 // Log before the transition, so that if the process exits in response to server failure,
773 // there is a higher likelihood that the cause will be in the logs.
774 if (!(service instanceof NoOpService)) {
775 logger.log(Level.SEVERE, "Service " + service + " has failed in the " + from + " state.",
776 failure);
777 }
778 state.transitionService(service, from, FAILED);
779 }
780 }
781 }
782
783 /**
784 * A {@link Service} instance that does nothing. This is only useful as a placeholder to
785 * ensure that the {@link ServiceManager} functions properly even when it is managing no services.
786 *
787 * <p>The use of this class is considered an implementation detail of ServiceManager and as such
788 * it is excluded from {@link #servicesByState}, {@link #startupTimes}, {@link #toString} and all
789 * logging statements.
790 */
791 private static final class NoOpService extends AbstractService {
792 @Override protected void doStart() { notifyStarted(); }
793 @Override protected void doStop() { notifyStopped(); }
794 }
795
796 /** This is never thrown but only used for logging. */
797 private static final class EmptyServiceManagerWarning extends Throwable {}
798 }